package com.uxsino.reactorq.clientserver;

import java.util.List;
import java.util.UUID;

import javax.jms.JMSException;
import javax.jms.QueueConnection;

import org.reactivestreams.Subscriber;

import com.uxsino.reactorq.commons.QueueFlux;
import com.uxsino.reactorq.event.Event;
import com.uxsino.reactorq.event.IndicatorQueryCmd;
import com.uxsino.reactorq.model.IndicatorValue;
import com.uxsino.reactorq.processor.QueueProcessor;

/**
 * connect to a collector and run indicator query in C/S mode. 
 * usage: 
 * 1.set up the proxy
 * 2. call createQuery to get a {@link QueryHandle}
 * 3. call start of the handle to execute the query and wait for result which will be returned by the query handle.
 *
 */
public class QueryProxy {
    // private static Logger logger = LoggerFactory.getLogger(QueryProxy.class);

    private QueueConnection conn;

    private QueueFlux<IndicatorValue> queueFlux;

    // private Flux<IndicatorValue> vflux;
//    private TopicProcessor<IndicatorValue> vProcessor = ProcessUtil.<IndicatorValue> createTopicProcessor("px2");
    private QueueProcessor<IndicatorValue> vProcessor = QueueProcessor.of("px2", 65535);

    private Subscriber<Event> eventSender;

    public QueryProxy(QueueConnection conn, Subscriber<Event> eventSender) throws JMSException {
        this.conn = conn;
        this.eventSender = eventSender;
        init();
    }

    private void init() throws JMSException {
        conn.start();
        queueFlux = new QueueFlux<IndicatorValue>(conn, null, IndicatorValue.class);
        // vflux = queueFlux.<IndicatorValue>getJsonDecodedFlux(IndicatorValue.class);
        queueFlux.subscribe(vProcessor::next);
    }

    /**
     * prepare the query and get the {@link QueryHandle}
     * @param collectorId which collector to execute on
     * @param neId the network entity to query
     * @param indicatorNames indicators to query
     * @return handle of the query
     */
    public QueryHandle createQuery(String collectorId, String neId, List<String> indicatorNames) {
        String queryId = UUID.randomUUID().toString();
        QueryHandle handle = new QueryHandle(vProcessor, queryId,
            q -> startQuery(q, collectorId, neId, indicatorNames));
        return handle;
    }

    private void startQuery(QueryHandle handle, String collectorId, String neId, List<String> indicatorNames) {
        IndicatorQueryCmd cmd = new IndicatorQueryCmd();
        cmd.neId = neId;
        cmd.queryId = handle.getQueryId();
        cmd.indicators = indicatorNames.toArray(new String[indicatorNames.size()]);

        cmd.replyTo = queueFlux.getQueue();
        cmd.collectorId = collectorId;
        cmd.valueTopicName = queueFlux.getQueueName();
        eventSender.onNext(cmd);
    }

    public void dispose() {
        if (conn != null) {
            try {
                conn.close();
            } catch (JMSException e) {
            }
            conn = null;
        }
    }
}